package com.zyf.core.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @author Malegod_xiaofei
 * @create 2023-12-04-22:49
 */
object Spark20_RDD_Operator_Transform_AllByKey {

  def main(args: Array[String]): Unit = {

    val sparkconf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(sparkconf)

    // TODO 算子 - Key - Value 类型

    val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 3)
      , ("b", 4), ("b", 5), ("a", 6)
    ), 2)

    rdd.reduceByKey(_ + _) // wordcount
    rdd.aggregateByKey(0)(_ + _, _ + _) // wordcount
    rdd.foldByKey(0)(_ + _) // wordcount
    rdd.combineByKey(v => v, (x: Int, y) => x + y, (x: Int, y: Int) => x + y) // wordcount

    /*
    reduceByKey:
      combineByKeyWithClassTag[V](
        (v: V) => v, // 第一个值不会参与计算
        func, // 分区内计算规则
        func  // 分区间计算规则
        )

    aggregateByKey:
      combineByKeyWithClassTag[U](
        (v: V) => cleanedSeqOp(createZero(), v),// 初始值和第一个 key 的 value 值进行的分区内数据操作
        cleanedSeqOp, // 分区内计算规则
        combOp        // 分区间计算规则
        )

    foldByKey:
      combineByKeyWithClassTag[V](
        (v: V) => cleanedFunc(createZero(), v), // 初始值和第一个 key 的 value 值进行的分区内数据操作
        cleanedFunc, // 分区内计算规则
        cleanedFunc  // 分区间计算规则
        )

    combineByKey:
      combineByKeyWithClassTag(
        createCombiner, // 相同 key 的第一条数据进行的处理函数
        mergeValue, // 表示分区内数据的处理函数
        mergeCombiners) // 表示分区间数据的处理逻辑


     */

    sc.stop()

  }

}
